airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject git commit: removing unnecessary static map in scp input/output hadnlers
Date Wed, 08 Oct 2014 18:11:38 GMT
Repository: airavata
Updated Branches:
  refs/heads/master a4c186c15 -> 3b63a2886


removing unnecessary static map in scp input/output hadnlers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3b63a288
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3b63a288
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3b63a288

Branch: refs/heads/master
Commit: 3b63a288695efec8a6a0cacbb478983f49d98963
Parents: a4c186c
Author: lahiru <lahiru@apache.org>
Authored: Wed Oct 8 14:10:28 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Wed Oct 8 14:10:28 2014 -0400

----------------------------------------------------------------------
 .../gfac/gsissh/util/GFACGSISSHUtils.java       |  20 +-
 .../ssh/handler/AdvancedSCPInputHandler.java    | 188 +++++++------------
 .../ssh/handler/AdvancedSCPOutputHandler.java   |  37 +---
 .../airavata/gfac/ssh/util/GFACSSHUtils.java    |  58 ++++--
 4 files changed, 122 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/3b63a288/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index af01147..997f47e 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -95,16 +95,18 @@ public class GFACGSISSHUtils {
                             clusters.get(key).remove(i);
                             recreate = true;
                         }
-                        try {
-                            pbsCluster.listDirectory("~/"); // its hard to trust isConnected
method, so we try to connect if it works we are good,else we recreate
-                        } catch (Exception e) {
-                            clusters.get(key).remove(i);
-                            logger.info("Connection found the connection map is expired,
so we create from the scratch");
-                            maxClusterCount++;
-                            recreate = true; // we make the pbsCluster to create again if
there is any exception druing connection
+                        if(!recreate) {
+                            try {
+                                pbsCluster.listDirectory("~/"); // its hard to trust isConnected
method, so we try to connect if it works we are good,else we recreate
+                            } catch (Exception e) {
+                                clusters.get(key).remove(i);
+                                logger.info("Connection found the connection map is expired,
so we create from the scratch");
+                                maxClusterCount++;
+                                recreate = true; // we make the pbsCluster to create again
if there is any exception druing connection
+                            }
+                            logger.info("Re-using the same connection used with the connection
string:" + key);
+                            context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(),
requestData, pbsCluster);
                         }
-                        logger.info("Re-using the same connection used with the connection
string:" + key);
-                        context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(),
requestData, pbsCluster);
                     } else {
                         recreate = true;
                     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/3b63a288/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index a0abe7f..9d94814 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -85,8 +85,6 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler
{
 
     private String inputPath;
 
-    public static Map<String, Cluster> clusters = new HashMap<String, Cluster>();
-
     public void initProperties(Properties properties) throws GFacHandlerException {
         password = (String) properties.get("password");
         passPhrase = (String) properties.get("passPhrase");
@@ -105,7 +103,7 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler
{
         MessageContext inputNew = new MessageContext();
         StringBuffer data = new StringBuffer("|");
         Cluster pbsCluster = null;
-        
+
         try {
             String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
             if (pluginData != null) {
@@ -128,23 +126,14 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler
{
                 } catch (ApplicationSettingsException e) {
                     log.error(e.getMessage());
                     try {
-         				GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT,
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-         			} catch (GFacException e1) {
-         				 log.error(e1.getLocalizedMessage());
-         			}
+                        GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(),
CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    } catch (GFacException e1) {
+                        log.error(e1.getLocalizedMessage());
+                    }
                     throw new GFacHandlerException("Error while creating SSHSecurityContext",
e, e.getLocalizedMessage());
                 }
             }
-            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
-                    .getApplicationDeploymentDescription().getType();
-
-            AuthenticationInfo authenticationInfo = null;
-            if (password != null) {
-                authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
-            } else {
-                authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath,
this.privateKeyPath,
-                        this.passPhrase);
-            }
+            pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
             // Server info
             String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID()
+ File.separator + jobExecutionContext.getTaskData().getTaskID();
             if (index < oldIndex) {
@@ -159,113 +148,72 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler
{
             TransferStatus status = new TransferStatus();
             // here doesn't matter what the job manager is because we are only doing some
file handling
             // not really dealing with monitoring or job submission, so we pa
-            String lastHost = null;
-            
-			MessageContext input = jobExecutionContext.getInMessageContext();
-			Set<String> parameters = input.getParameters().keySet();
-			for (String paramName : parameters) {
-				ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
-				String paramValue = MappingFactory.toString(actualParameter);
-				// TODO: Review this with type
-				if ("URI".equals(actualParameter.getType().getType().toString())) {
-					try {
-						URL file = new URL(paramValue);
-						this.userName = file.getUserInfo();
-						this.hostName = file.getHost();
-						paramValue = file.getPath();
-					} catch (MalformedURLException e) {
-						log.error(e.getLocalizedMessage(),e);
-					}
-					 ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
-					 String key = this.userName + this.hostName;
-			            boolean recreate = false;
-			            if (clusters.containsKey(key) && clusters.get(key).getSession().isConnected())
{
-			                pbsCluster = (PBSCluster) clusters.get(key);
-			                try {
-			                    pbsCluster.listDirectory("~/"); // its hard to trust isConnected method,
so we try to connect if it works we are good,else we recreate
-			                	log.info("Reusing existing connection for ---- : " + this.hostName);
-			                } catch (Exception e) {
-			                    log.info("Connection found the connection map is expired, so we create
from the scratch");
-			                    recreate = true; // we make the pbsCluster to create again if there
is any exception druing connection
-			                }
-			            } else{
-			            	recreate = true;
-			            }
-			            if(recreate){
-			            pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
-			            log.info("Connection created for ---- : " + this.hostName);
-			            clusters.put(key, pbsCluster);
-			            }
-			            if (index < oldIndex) {
-						log.info("Input File: " + paramValue + " is already transfered, so we skip this operation
!!!");
-						((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
-						data.append(oldFiles.get(index++)).append(","); // we get already transfered file and
increment the index
-					} else {
-						String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
-						((URIParameterType) actualParameter.getType()).setValue(stageInputFile);
-						StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
-						status.setTransferState(TransferState.UPLOAD);
-						detail.setTransferStatus(status);
-						detail.setTransferDescription("Input Data Staged: " + stageInputFile);
-						registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
 
-						GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-					}
-				} else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-					List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
-					List<String> newFiles = new ArrayList<String>();
-					for (String paramValueEach : split) {
-						try {
-							URL file = new URL(paramValue);
-							this.userName = file.getUserInfo();
-							this.hostName = file.getHost();
-							paramValueEach = file.getPath();
-						} catch (MalformedURLException e) {
-							log.error(e.getLocalizedMessage(),e);
-						}
-						ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
-						String key = this.userName + this.hostName;
-			            boolean recreate = false;
-			            if (clusters.containsKey(key) && clusters.get(key).getSession().isConnected())
{
-			                pbsCluster = (PBSCluster) clusters.get(key);
-			                try {
-			                    pbsCluster.listDirectory("~/"); // its hard to trust isConnected method,
so we try to connect if it works we are good,else we recreate
-			                	log.info("Reusing existing connection for ---- : " + this.hostName);
-			                } catch (Exception e) {
-			                    log.info("Connection found the connection map is expired, so we create
from the scratch");
-			                    recreate = true; // we make the pbsCluster to create again if there
is any exception druing connection
-			                }
-			            } else{
-			            	recreate = true;
-			            }
-			            if(recreate){
-			            pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
-			            log.info("Connection created for ---- : " + this.hostName);
-			            clusters.put(key, pbsCluster);
-			            }
-			         
-						if (index < oldIndex) {
-							log.info("Input File: " + paramValue + " is already transfered, so we skip this operation
!!!");
-							newFiles.add(oldFiles.get(index));
-							data.append(oldFiles.get(index++)).append(",");
-						} else {
-							String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
-							StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
-							GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
-							newFiles.add(stageInputFiles);
-						}
-					}
-					((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
-				}
-				inputNew.getParameters().put(paramName, actualParameter);
-			}
+            MessageContext input = jobExecutionContext.getInMessageContext();
+            Set<String> parameters = input.getParameters().keySet();
+            for (String paramName : parameters) {
+                ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+                String paramValue = MappingFactory.toString(actualParameter);
+                // TODO: Review this with type
+                if ("URI".equals(actualParameter.getType().getType().toString())) {
+                    try {
+                        URL file = new URL(paramValue);
+                        this.userName = file.getUserInfo();
+                        this.hostName = file.getHost();
+                        paramValue = file.getPath();
+                    } catch (MalformedURLException e) {
+                        log.error(e.getLocalizedMessage(), e);
+                    }
+
+                    if (index < oldIndex) {
+                        log.info("Input File: " + paramValue + " is already transfered, so
we skip this operation !!!");
+                        ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+                        data.append(oldFiles.get(index++)).append(","); // we get already
transfered file and increment the index
+                    } else {
+                        String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
+                        ((URIParameterType) actualParameter.getType()).setValue(stageInputFile);
+                        StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
+                        status.setTransferState(TransferState.UPLOAD);
+                        detail.setTransferStatus(status);
+                        detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+                        GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index),
this.getClass().getName());
+                    }
+                } else if ("URIArray".equals(actualParameter.getType().getType().toString()))
{
+                    List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+                    List<String> newFiles = new ArrayList<String>();
+                    for (String paramValueEach : split) {
+                        try {
+                            URL file = new URL(paramValue);
+                            this.userName = file.getUserInfo();
+                            this.hostName = file.getHost();
+                            paramValueEach = file.getPath();
+                        } catch (MalformedURLException e) {
+                            log.error(e.getLocalizedMessage(), e);
+                        }
+                        if (index < oldIndex) {
+                            log.info("Input File: " + paramValue + " is already transfered,
so we skip this operation !!!");
+                            newFiles.add(oldFiles.get(index));
+                            data.append(oldFiles.get(index++)).append(",");
+                        } else {
+                            String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach,
parentPath);
+                            StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+                            GFacUtils.savePluginData(jobExecutionContext, temp.insert(0,
++index), this.getClass().getName());
+                            newFiles.add(stageInputFiles);
+                        }
+                    }
+                    ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new
String[newFiles.size()]));
+                }
+                inputNew.getParameters().put(paramName, actualParameter);
+            }
         } catch (Exception e) {
             log.error(e.getMessage());
             try {
- 				GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT,
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- 			} catch (GFacException e1) {
- 				 log.error(e1.getLocalizedMessage());
- 			}
+                GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(),
CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            } catch (GFacException e1) {
+                log.error(e1.getLocalizedMessage());
+            }
             throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
         }
         jobExecutionContext.setInMessageContext(inputNew);

http://git-wip-us.apache.org/repos/asf/airavata/blob/3b63a288/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
index b1e03b2..1f16206 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -32,12 +32,10 @@ import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
 import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
-import org.apache.airavata.gsi.ssh.util.CommonUtils;
 import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
 import org.apache.airavata.model.workspace.experiment.DataObjectType;
 import org.apache.airavata.model.workspace.experiment.DataType;
@@ -51,7 +49,6 @@ import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -88,9 +85,6 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
 
     private String outputPath;
     
-    public static Map<String, Cluster> clusters = new HashMap<String, Cluster>();
-    
-    
 
     public void initProperties(Properties properties) throws GFacHandlerException {
         password = (String)properties.get("password");
@@ -119,19 +113,12 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
                     throw new GFacHandlerException("Error while creating SSHSecurityContext",
e, e.getLocalizedMessage());
                 }
             }
+            pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
             ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
                     .getApplicationDeploymentDescription().getType();
             String standardError = app.getStandardError();
             String standardOutput = app.getStandardOutput();
-            String outputDataDirectory = app.getOutputDataDirectory();
             super.invoke(jobExecutionContext);
-            AuthenticationInfo authenticationInfo = null;
-            if (password != null) {
-                authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
-            } else {
-                authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath,
this.privateKeyPath,
-                        this.passPhrase);
-            }
             // Server info
             if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null
&& jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir()
!= null){
             	try{
@@ -143,30 +130,10 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
 					log.error(e.getLocalizedMessage(),e);
 				}
             }
-            ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
-            String key = this.userName + this.hostName;
-            boolean recreate = false;
-            if (clusters.containsKey(key) && clusters.get(key).getSession().isConnected())
{
-                pbsCluster = (PBSCluster) clusters.get(key);
-                try {
-                    pbsCluster.listDirectory("~/"); // its hard to trust isConnected method,
so we try to connect if it works we are good,else we recreate
-                	log.info("Reusing existing connection for ---- : " + this.hostName);
-                } catch (Exception e) {
-                    log.info("Connection found the connection map is expired, so we create
from the scratch");
-                    recreate = true; // we make the pbsCluster to create again if there is
any exception druing connection
-                }
-            } else{
-            	recreate = true;
-            }
-            if(recreate){
-            pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
-            log.info("Connection created for ---- : " + this.hostName);
-            clusters.put(key, pbsCluster);
-            }
             if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null
&& !jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
             outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID()
+ "-" + jobExecutionContext.getTaskData().getTaskID()
                     + File.separator;
-            pbsCluster.makeDirectory(outputPath);
+                pbsCluster.makeDirectory(outputPath);
             }
             pbsCluster.scpTo(outputPath, standardError);
             pbsCluster.scpTo(outputPath, standardOutput);

http://git-wip-us.apache.org/repos/asf/airavata/blob/3b63a288/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index a31eac1..646a375 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -40,6 +40,7 @@ import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.ServerInfo;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
 import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
@@ -56,7 +57,10 @@ import java.util.*;
 public class GFACSSHUtils {
     private final static Logger logger = LoggerFactory.getLogger(GFACSSHUtils.class);
 
-    public static Map<String, Cluster> clusters = new HashMap<String, Cluster>();
+    public static Map<String, List<Cluster>> clusters = new HashMap<String,
List<Cluster>>();
+
+    public static int maxClusterCount = 5;
+
 
     public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws
GFacException, ApplicationSettingsException {
         HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription();
@@ -87,23 +91,43 @@ public class GFACSSHUtils {
                 String key = credentials.getPortalUserName() + registeredHost.getType().getHostAddress()
+
                         serverInfo.getPort();
                 boolean recreate = false;
-                if (clusters.containsKey(key) && clusters.get(key).getSession().isConnected())
{
-                    pbsCluster = clusters.get(key);
-                    try {
-                        pbsCluster.listDirectory("~/"); // its hard to trust isConnected
method, so we try to connect if it works we are good,else we recreate
-                        // its hard to trust isConnected method, so we try to connect if
it works we are good,else we recreate
-                    }catch(Exception e){
-                        logger.info("Connection found the connection map is expired, so we
create from the scratch");
-                        recreate = true; // we make the pbsCluster to create again if there
is any exception druing connection
+                synchronized (clusters) {
+                    if (clusters.containsKey(key) && clusters.get(key).size() <
maxClusterCount) {
+                        recreate = true;
+                    } else if (clusters.containsKey(key)) {
+                        int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
+                        if (clusters.get(key).get(i).getSession().isConnected()) {
+                            pbsCluster = clusters.get(key).get(i);
+                        } else {
+                            clusters.get(key).remove(i);
+                            recreate = true;
+                        }
+                        if(!recreate) {
+                            try {
+                                pbsCluster.listDirectory("~/"); // its hard to trust isConnected
method, so we try to connect if it works we are good,else we recreate
+                            } catch (Exception e) {
+                                clusters.get(key).remove(i);
+                                logger.info("Connection found the connection map is expired,
so we create from the scratch");
+                                maxClusterCount++;
+                                recreate = true; // we make the pbsCluster to create again
if there is any exception druing connection
+                            }
+                        }
+                        logger.info("Re-using the same connection used with the connection
string:" + key);
+                    } else {
+                        recreate = true;
+                    }
+                    if (recreate) {
+                        pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo,
+                                CommonUtils.getPBSJobManager(installedParentPath));
+                        List<Cluster> pbsClusters = null;
+                        if (!(clusters.containsKey(key))) {
+                            pbsClusters = new ArrayList<Cluster>();
+                        } else {
+                            pbsClusters = clusters.get(key);
+                        }
+                        pbsClusters.add(pbsCluster);
+                        clusters.put(key, pbsClusters);
                     }
-                }else{
-                    recreate = true;
-                }
-
-                if(recreate) {
-                    pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo,
-                            CommonUtils.getPBSJobManager(installedParentPath));
-                    clusters.put(key, pbsCluster);
                 }
             } catch (Exception e) {
                 e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.


Mime
View raw message