airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/2] git commit: stabilizing the code to handle large load
Date Wed, 08 Oct 2014 16:12:52 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 57adb55db -> 7de087118


stabilizing the code to handle large load


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/42219ba5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/42219ba5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/42219ba5

Branch: refs/heads/master
Commit: 42219ba5f629f8cc81a262f944a9fae4f257c801
Parents: dbb1c97
Author: lahiru <lahiru@apache.org>
Authored: Tue Oct 7 17:06:16 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Tue Oct 7 17:06:16 2014 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  | 20 +++--
 .../airavata/gfac/server/GfacServerHandler.java |  6 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  3 -
 .../gfac/core/utils/GFacThreadPoolExecutor.java |  7 ++
 .../gfac/gsissh/util/GFACGSISSHUtils.java       | 87 +++++++++++++-------
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 42 +++++-----
 .../airavata/gfac/monitor/util/CommonUtils.java | 22 +++--
 .../server/OrchestratorServerHandler.java       |  4 +-
 tools/gsissh/pom.xml                            | 12 +--
 .../java/com/jcraft/jsch/ExtendedSession.java   |  2 +
 .../com/jcraft/jsch/GSISSHIdentityFile.java     |  3 +-
 .../airavata/gsi/ssh/api/CommandExecutor.java   | 10 +--
 .../gsi/ssh/impl/GSISSHAbstractCluster.java     | 30 +++++--
 .../gsi/ssh/impl/StandardOutReader.java         |  3 -
 .../apache/airavata/gsi/ssh/util/SSHUtils.java  | 12 +--
 15 files changed, 161 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 8d3ef3b..7b7557f 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -46,9 +46,9 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_USER = "default.registry.user";
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     private static Airavata.Client airavataClient;
-    private static String echoAppId = "Echo_89831769-edf5-4f27-a8c9-fe0ef96fd355";
+    private static String echoAppId = "Echo_8d137226-e59e-4a23-a1ce-385b670ae116";
     private static String wrfAppId = "WRF_15ae6599-a48f-4134-95b8-98e109ac6f88";
-    private static String amberAppId = "Amber_a7b18a3a-31b3-4dc7-8faf-7c3144f14201";
+    private static String amberAppId = "Amber_ecb03042-4269-4e90-9cf9-99588b183123";
 
     private static String localHost = "localhost";
     private static String trestlesHostName = "trestles.sdsc.xsede.org";
@@ -60,10 +60,14 @@ public class CreateLaunchExperiment {
                 airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST,
THRIFT_SERVER_PORT);
                 System.out.println("API version is " + airavataClient.getAPIVersion());
 //            registerApplications(); // run this only the first time
-//                for (int i = 0; i < 100; i++) {
+                int requestCount = 0;
+                while(true){
+                for (int i = 0; i < 20; i++) {
 //            final String expId = createExperimentForSSHHost(airavata);
-                    final String expId = createEchoExperimentForTrestles(airavataClient);
-//            final String expId = createEchoExperimentForStampede(airavataClient);
+//                    final String expId = createEchoExperimentForTrestles(airavataClient);
+                    final String expId = createEchoExperimentForStampede(airavataClient);
+                    requestCount++;
+                    System.out.println("Sending request:"+requestCount);
 //            final String expId = createExperimentEchoForLocalHost(airavataClient);
 //            final String expId = createExperimentWRFTrestles(airavataClient);
 //            final String expId = createExperimentForBR2(airavataClient);
@@ -72,10 +76,12 @@ public class CreateLaunchExperiment {
 //            final String expId = createExperimentForStampedeAmber(airavataClient);
 //            final String expId = createExperimentForTrestlesAmber(airavataClient);
 
-            System.out.println("Experiment ID : " + expId);
+//            System.out.println("Experiment ID : " + expId);
 //            updateExperiment(airavata, expId);
                     launchExperiment(airavataClient, expId);
-//                }
+                }
+                    Thread.sleep(300000);
+                }
             } catch (Exception e) {
                 logger.error("Error while connecting with server", e.getMessage());
                 e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 11c33ec..3ad6e3e 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -196,11 +196,12 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
     public boolean submitJob(String experimentId, String taskId, String gatewayId) throws
TException {
         logger.infoId(experimentId, "GFac Received submit jog request for the Experiment:
{} TaskId: {}", experimentId, taskId);
         GFac gfac = getGfac();
-//        InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId,
taskId, gatewayId);
+        InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId,
taskId, gatewayId);
         try {
             if( gfac.submitJob(experimentId, taskId, gatewayId)){
                 logger.debugId(experimentId, "Submitted jog to the Gfac Implementation, experiment
{}, task {}, gateway " +
-                        "{}", experimentId, taskId, gatewayId);
+                "{}", experimentId, taskId, gatewayId);
+//                 inHandlerFutures.add(GFacThreadPoolExecutor.getFixedThreadPool().submit(inputHandlerWorker));
                 return true;
             }else{
                 logger.error(experimentId, "Failed to submit job to the GFac implementation,
experiment {}, task {}, " +
@@ -210,7 +211,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
         } catch (GFacException e) {
             throw new TException("Error launching the experiment : " + e.getMessage(), e);
         }
-//        inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker));
     }
 
     public boolean cancelJob(String experimentId, String taskId) throws TException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 1ed3a67..8ac35cc 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -26,8 +26,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.xpath.XPathExpressionException;
@@ -57,7 +55,6 @@ import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.monitor.JobIdentity;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-//import org.apache.airavata.api.server.listener.ExperimentStatusChangedEvent;
 import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
 import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
index 56a97a5..87e4618 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
@@ -32,4 +32,11 @@ public class GFacThreadPoolExecutor {
         }
         return cachedThreadPool;
     }
+
+    public static ExecutorService getFixedThreadPool() {
+        if(cachedThreadPool==null){
+            cachedThreadPool = Executors.newFixedThreadPool(500);
+        }
+        return cachedThreadPool;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index 2de1f2f..af01147 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -56,6 +56,8 @@ import org.apache.airavata.schemas.gfac.UnicoreHostType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.validation.constraints.Max;
+
 
 public class GFACGSISSHUtils {
     private final static Logger logger = LoggerFactory.getLogger(GFACGSISSHUtils.class);
@@ -63,7 +65,8 @@ public class GFACGSISSHUtils {
     public static final String PBS_JOB_MANAGER = "pbs";
     public static final String SLURM_JOB_MANAGER = "slurm";
     public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
-    public static Map<String, Cluster> clusters = new HashMap<String, Cluster>();
+    public static int maxClusterCount = 5;
+    public static Map<String, List<Cluster>> clusters = new HashMap<String,
List<Cluster>>();
     public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws
GFacException, ApplicationSettingsException {
         HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription();
         if (registeredHost.getType() instanceof GlobusHostType || registeredHost.getType()
instanceof UnicoreHostType
@@ -81,42 +84,62 @@ public class GFACGSISSHUtils {
                 String key = requestData.getMyProxyUserName() + registeredHost.getType().getHostAddress()
+
                         gsisshHostType.getPort();
                 boolean recreate = false;
-                if (clusters.containsKey(key) && clusters.get(key).getSession().isConnected())
{
-                    pbsCluster = (PBSCluster) clusters.get(key);
-                    try {
-                        pbsCluster.listDirectory("~/"); // its hard to trust isConnected
method, so we try to connect if it works we are good,else we recreate
-                    } catch (Exception e) {
-                        logger.info("Connection found the connection map is expired, so we
create from the scratch");
-                        recreate = true; // we make the pbsCluster to create again if there
is any exception druing connection
+                synchronized (clusters) {
+                    if (clusters.containsKey(key) && clusters.get(key).size() <
maxClusterCount) {
+                        recreate = true;
+                    } else if (clusters.containsKey(key)) {
+                        int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
+                        if (clusters.get(key).get(i).getSession().isConnected()) {
+                            pbsCluster = (PBSCluster) clusters.get(key).get(i);
+                        } else {
+                            clusters.get(key).remove(i);
+                            recreate = true;
+                        }
+                        try {
+                            pbsCluster.listDirectory("~/"); // its hard to trust isConnected
method, so we try to connect if it works we are good,else we recreate
+                        } catch (Exception e) {
+                            clusters.get(key).remove(i);
+                            logger.info("Connection found the connection map is expired,
so we create from the scratch");
+                            maxClusterCount++;
+                            recreate = true; // we make the pbsCluster to create again if
there is any exception druing connection
+                        }
+                        logger.info("Re-using the same connection used with the connection
string:" + key);
+                        context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(),
requestData, pbsCluster);
+                    } else {
+                        recreate = true;
                     }
-                    logger.info("Re-using the same connection used with the connection string:"
+ key);
-                    context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(),
requestData, pbsCluster);
-                } else {
-                    recreate = true;
-                }
-                if(recreate) {
-                    ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(),
registeredHost.getType().getHostAddress(),
-                            gsisshHostType.getPort());
 
-                    JobManagerConfiguration jConfig = null;
-                    String installedParentPath = ((HpcApplicationDeploymentType)
-                            jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
-                    String jobManager = ((GsisshHostType) registeredHost.getType()).getJobManager();
-                    if (jobManager == null) {
-                        logger.error("No Job Manager is configured, so we are picking pbs
as the default job manager");
-                        jConfig = CommonUtils.getPBSJobManager(installedParentPath);
-                    } else {
-                        if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                    if (recreate) {
+                        ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(),
registeredHost.getType().getHostAddress(),
+                                gsisshHostType.getPort());
+
+                        JobManagerConfiguration jConfig = null;
+                        String installedParentPath = ((HpcApplicationDeploymentType)
+                                jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
+                        String jobManager = ((GsisshHostType) registeredHost.getType()).getJobManager();
+                        if (jobManager == null) {
+                            logger.error("No Job Manager is configured, so we are picking
pbs as the default job manager");
                             jConfig = CommonUtils.getPBSJobManager(installedParentPath);
-                        } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
-                            jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
-                        } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager))
{
-                            jConfig = CommonUtils.getSGEJobManager(installedParentPath);
+                        } else {
+                            if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+                            } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+                            } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager))
{
+                                jConfig = CommonUtils.getSGEJobManager(installedParentPath);
+                            }
+                        }
+                        pbsCluster = new PBSCluster(serverInfo, tokenizedMyProxyAuthInfo,
jConfig);
+                        context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(),
requestData, pbsCluster);
+                        List<Cluster> pbsClusters = null;
+                        if (!(clusters.containsKey(key))) {
+                            pbsClusters = new ArrayList<Cluster>();
+                        } else {
+                            pbsClusters = clusters.get(key);
                         }
+                        pbsClusters.add(pbsCluster);
+                        clusters.put(key, pbsClusters);
                     }
-                    pbsCluster = new PBSCluster(serverInfo, tokenizedMyProxyAuthInfo, jConfig);
-                    context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(),
requestData, pbsCluster);
-                    clusters.put(key, pbsCluster);
                 }
             } catch (Exception e) {
                 throw new GFacException("An error occurred while creating GSI security context",
e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 959104e..ab508f9 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -29,6 +29,8 @@ import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
 import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@ -242,26 +244,26 @@ public class HPCPullMonitor extends PullMonitor {
                             iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
                             String outputDir = iMonitorID.getJobExecutionContext().getApplicationContext()
                                     .getApplicationDeploymentDescription().getType().getOutputDataDirectory();
-                            List<String> stdOut = connection.getCluster().listDirectory(outputDir);
// check the outputs directory
-                            if (stdOut.size() > 0) { // have to be careful with this
-                                for(int i=0;i<stdOut.size();i++) {
-                                    logger.info("--------------------------------------------------------------------------------------------");
-                                    logger.info("--------------------------------------------------------------------------------------------");
-                                    logger.info("--------------------------------------------------------------------------------------------");
-                                    logger.info("--------------------------------------------------------------------------------------------");
-                                    logger.info("--------------------------------------------------------------------------------------------");
-                                    logger.info("--------------------------------------------------------------------------------------------");
-                                    logger.info("--------------------------------------------------------------------------------------------");
-                                    logger.info(stdOut.get(i));
+                            List<String> stdOut = null;
+                            try {
+                                stdOut = connection.getCluster().listDirectory(outputDir);
// check the outputs directory
+                            } catch (SSHApiException e) {
+                                if (e.getMessage().contains("No such file or directory"))
{
+                                    // this is because while we run output handler something
failed and during exception
+                                    // we store all the jobs in the monitor queue again
+                                    logger.error("We know this  job is already attempted
to run out-handlers");
+                                    CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
                                 }
-                                completedJobs.put(iMonitorID.getJobName(), iMonitorID);
-                                logger.errorId(iMonitorID.getJobID(), "Job monitoring failed
{} times, removed job {} from " +
-                                                "monitor queue. Experiment {} , task {}",
iMonitorID.getFailedCount(),
-                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID());
-                            } else {
-                                iMonitorID.setFailedCount(0);
                             }
-                        } else {
+                                if (stdOut !=null && stdOut.size() > 0 &&
!stdOut.get(0).isEmpty()) { // have to be careful with this
+                                    completedJobs.put(iMonitorID.getJobName(), iMonitorID);
+                                    logger.errorId(iMonitorID.getJobID(), "Job monitoring
failed {} times, removed job {} from " +
+                                                    "monitor queue. Experiment {} , task
{}", iMonitorID.getFailedCount(),
+                                            iMonitorID.getExperimentID(), iMonitorID.getTaskID());
+                                } else {
+                                    iMonitorID.setFailedCount(0);
+                                }
+                            } else {
                             // Evey
                             iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
                             // if the job is complete we remove it from the Map, if any of
these maps
@@ -284,8 +286,8 @@ public class HPCPullMonitor extends PullMonitor {
             for (String jobName: keys) {
                 MonitorID completedJob = completedJobs.get(jobName);
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
-                gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
-//                GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac,
completedJob, publisher));
+                    gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
+//                  GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac,
completedJob, publisher));
                 if (zk == null) {
                     zk = completedJob.getJobExecutionContext().getZk();
                 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index f4b2109..25f884d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -145,7 +145,8 @@ public class CommonUtils {
         }
         return true;
     }
-    public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue,MonitorID
monitorID) throws AiravataMonitorException {
+
+    public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue,
MonitorID monitorID) throws AiravataMonitorException {
         synchronized (queue) {
             Iterator<UserMonitorData> iterator = queue.iterator();
             while (iterator.hasNext()) {
@@ -153,19 +154,22 @@ public class CommonUtils {
                 if (next.getUserName().equals(monitorID.getUserName())) {
                     // then this is the right place to update
                     List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
-                    for (HostMonitorData iHostMonitorID : hostMonitorData) {
+                    Iterator<HostMonitorData> iterator1 = hostMonitorData.iterator();
+                    while (iterator1.hasNext()) {
+                        HostMonitorData iHostMonitorID = iterator1.next();
                         if (iHostMonitorID.getHost().toXML().equals(monitorID.getHost().toXML()))
{
-                            List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
-                            for (MonitorID iMonitorID : monitorIDs) {
+                            Iterator<MonitorID> iterator2 = iHostMonitorID.getMonitorIDs().iterator();
+                            while (iterator2.hasNext()) {
+                                MonitorID iMonitorID = iterator2.next();
                                 if (iMonitorID.getJobID().equals(monitorID.getJobID())
                                         || iMonitorID.getJobName().equals(monitorID.getJobName()))
{
                                     // OK we found the object, we cannot do list.remove(object)
states of two objects
                                     // could be different, thats why we check the jobID
-                                    monitorIDs.remove(iMonitorID);
-                                    logger.infoId(monitorID.getJobID(), "Removed the job:
{} from monitoring last " +
-                                            "status:{}", monitorID.getJobID(), monitorID.getStatus().toString());
-                                    if (monitorIDs.size() == 0) {
-                                        hostMonitorData.remove(iHostMonitorID);
+                                    iterator2.remove();
+                                    logger.infoId(monitorID.getJobID(), "Removed the jobId:
{} JobName: {} from monitoring last " +
+                                            "status:{}", monitorID.getJobID(),monitorID.getJobName(),
monitorID.getStatus().toString());
+                                    if (iHostMonitorID.getMonitorIDs().size() == 0) {
+                                        iterator1.remove();
                                         logger.debug("Removed host {} from monitoring queue",
iHostMonitorID.getHost()
                                                 .getType().getHostAddress());
                                         if (hostMonitorData.size() == 0) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index a892984..929408d 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -354,7 +354,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 						// we recover one gfac node at a time
 						final WatchedEvent event = watchedEvent;
 						final OrchestratorServerHandler handler = this;
-						(new Thread() {
+						/*(new Thread() {  // disabling ft implementation with zk
 							public void run() {
 								int retry = 0;
 								while (retry < 3) {
@@ -373,7 +373,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 								}
 
 							}
-						}).start();
+						}).start();*/
 						break;
 					}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/tools/gsissh/pom.xml
----------------------------------------------------------------------
diff --git a/tools/gsissh/pom.xml b/tools/gsissh/pom.xml
index bef33c8..e82419b 100644
--- a/tools/gsissh/pom.xml
+++ b/tools/gsissh/pom.xml
@@ -32,12 +32,12 @@
 	</prerequisites>
 
 	<dependencies>
-		<dependency>
-			<groupId>com.jcraft</groupId>
-			<artifactId>jsch</artifactId>
-			<version>0.1.50</version>
-		</dependency>
-		<dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>0.1.50</version>
+        </dependency>
+        <dependency>
 			<groupId>org.jglobus</groupId>
 			<artifactId>myproxy</artifactId>
 			<version>${jglobus.version}</version>

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/tools/gsissh/src/main/java/com/jcraft/jsch/ExtendedSession.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/com/jcraft/jsch/ExtendedSession.java b/tools/gsissh/src/main/java/com/jcraft/jsch/ExtendedSession.java
index f3ead64..5d21a41 100644
--- a/tools/gsissh/src/main/java/com/jcraft/jsch/ExtendedSession.java
+++ b/tools/gsissh/src/main/java/com/jcraft/jsch/ExtendedSession.java
@@ -21,6 +21,8 @@
 
 package com.jcraft.jsch;
 
+import com.jcraft.jsch.*;
+
 import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
 
 public class ExtendedSession extends Session {

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/tools/gsissh/src/main/java/com/jcraft/jsch/GSISSHIdentityFile.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/com/jcraft/jsch/GSISSHIdentityFile.java b/tools/gsissh/src/main/java/com/jcraft/jsch/GSISSHIdentityFile.java
index 2a98c9c..8601973 100644
--- a/tools/gsissh/src/main/java/com/jcraft/jsch/GSISSHIdentityFile.java
+++ b/tools/gsissh/src/main/java/com/jcraft/jsch/GSISSHIdentityFile.java
@@ -23,13 +23,14 @@
 package com.jcraft.jsch;
 
 import java.io.*;
+import com.jcraft.jsch.*;
 
 /**
  * NOTE : This is class is directly created using com.jcraft.jsch.IdentityFile
  * IdentityFile has private access. Therefore to suit our requirements we modify IdentityFile
  * with public access.
  */
-public class GSISSHIdentityFile implements Identity{
+public class GSISSHIdentityFile implements Identity {
     private JSch jsch;
     private KeyPair kpair;
     private String identity;

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java
index 6f9a36d..e8f92b0 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/CommandExecutor.java
@@ -37,7 +37,7 @@ public class CommandExecutor {
     static {
         JSch.setConfig("gssapi-with-mic.x509", "org.apache.airavata.gsi.ssh.GSSContextX509");
         JSch.setConfig("userauth.gssapi-with-mic", "com.jcraft.jsch.UserAuthGSSAPIWithMICGSSCredentials");
-
+        JSch jSch = new JSch();
     }
 
     private static final Logger log = LoggerFactory.getLogger(CommandExecutor.class);
@@ -75,7 +75,7 @@ public class CommandExecutor {
             channel.connect();
         } catch (JSchException e) {
 
-//            channel.disconnect();
+            channel.disconnect();
 //            session.disconnect();
             throw new SSHApiException("Unable to retrieve command output. Command - " + command,
e);
         }
@@ -83,7 +83,7 @@ public class CommandExecutor {
 
         commandOutput.onOutput(channel);
         //Only disconnecting the channel, session can be reused
-//        channel.disconnect();
+        channel.disconnect();
         return session;
     }
 
@@ -253,7 +253,7 @@ public class CommandExecutor {
             channel.connect();
         } catch (JSchException e) {
 
-//            channel.disconnect();
+            channel.disconnect();
 //            session.disconnect();
 
             throw new SSHApiException("Unable to retrieve command output. Command - " + command
+
@@ -264,7 +264,7 @@ public class CommandExecutor {
 
         commandOutput.onOutput(channel);
 
-//        channel.disconnect();
+        channel.disconnect();
 //        session.disconnect();
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
index ee4c0cb..bd66412 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -298,11 +298,26 @@ public class GSISSHAbstractCluster implements Cluster {
 
             //reusing submitBatchJobWithScript method to submit a job
             String jobID = null;
-            try {
-                jobID = this.submitBatchJobWithScript(tempPBSFile.getAbsolutePath(),
-                        jobDescriptor.getWorkingDirectory());
-            } catch (SSHApiException e) {
-                throw e;
+            int retry = 3;
+            while(retry>0) {
+                try {
+                    jobID = this.submitBatchJobWithScript(tempPBSFile.getAbsolutePath(),
+                            jobDescriptor.getWorkingDirectory());
+                    retry=0;
+                } catch (SSHApiException e) {
+                    retry--;
+                    if(retry==0) {
+                        throw e;
+                    }else{
+                        try {
+                            Thread.sleep(5000);
+                        } catch (InterruptedException e1) {
+                            e1.printStackTrace();
+                        }
+                        log.error("Error occured during job submission but doing a retry");
+                        e.printStackTrace();
+                    }
+                }
             }
             log.debug("Job has successfully submitted, JobID : " + jobID);
             if (jobID != null) {
@@ -414,6 +429,8 @@ public class GSISSHAbstractCluster implements Cluster {
                 if (retry == 0) {
                     throw new SSHApiException("Failed during scping local file:" + localFile
+ " to remote file "
                             + serverInfo.getHost() + ":rFile", e);
+                }else{
+                    log.error("Error performing scp but doing a retry");
                 }
             } catch (JSchException e) {
                 retry--;
@@ -426,6 +443,8 @@ public class GSISSHAbstractCluster implements Cluster {
                 if(retry==0) {
                     throw new SSHApiException("Failed during scping local file:" + localFile
+ " to remote file "
                             + serverInfo.getHost() + ":rFile", e);
+                }else{
+                    log.error("Error performing scp but doing a retry");
                 }
             }
         }
@@ -500,6 +519,7 @@ public class GSISSHAbstractCluster implements Cluster {
                 files = SSHUtils.listDirectory(directoryPath, session);
                 retry=0;
             } catch (IOException e) {
+                e.printStackTrace();
                 retry--;
                 try {
                     Thread.sleep(5000);

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
index 0ec9992..4846da8 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
@@ -53,9 +53,6 @@ public class StandardOutReader implements CommandOutput {
                 if (channel.isClosed()) {
                     break;
                 }
-                try {
-                } catch (Exception ignored) {
-                }
             }
             String output = pbsOutput.toString();
             this.setStdOutputString(output);

http://git-wip-us.apache.org/repos/asf/airavata/blob/42219ba5/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java
index 2d2a4ca..1fdaa1f 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/util/SSHUtils.java
@@ -212,7 +212,7 @@ public class SSHUtils {
         if (stdOutReader.getStdErrorString().contains("scp:")) {
             throw new SSHApiException(stdOutReader.getStdErrorString());
         }
-//        channel.disconnect();
+        channel.disconnect();
     }
 
     /**
@@ -312,7 +312,7 @@ public class SSHUtils {
         stdOutReader.onOutput(channel);
 
 
-//        channel.disconnect();
+        channel.disconnect();
         if (stdOutReader.getStdErrorString().contains("scp:")) {
             throw new SSHApiException(stdOutReader.getStdErrorString());
         }
@@ -714,7 +714,7 @@ public class SSHUtils {
             channel.connect();
         } catch (JSchException e) {
 
-//            channel.disconnect();
+            channel.disconnect();
 //            session.disconnect();
 
             throw new SSHApiException("Unable to retrieve command output. Command - " + command
+
@@ -727,7 +727,7 @@ public class SSHUtils {
             throw new SSHApiException(stdOutReader.getStdErrorString());
         }
 
-//        channel.disconnect();
+        channel.disconnect();
     }
 
     public static List<String> listDirectory(String path, Session session) throws IOException,
JSchException, SSHApiException {
@@ -745,7 +745,7 @@ public class SSHUtils {
             channel.connect();
         } catch (JSchException e) {
 
-//            channel.disconnect();
+            channel.disconnect();
 //            session.disconnect();
 
             throw new SSHApiException("Unable to retrieve command output. Command - " + command
+
@@ -758,7 +758,7 @@ public class SSHUtils {
         if (stdOutReader.getStdErrorString().contains("ls:")) {
             throw new SSHApiException(stdOutReader.getStdErrorString());
         }
-//        channel.disconnect();
+        channel.disconnect();
         return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
     }
 


Mime
View raw message